# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS.  All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.

"""Evaluation score abstract class and implementations.
"""

from __future__ import division
import logging
import os
import re
import subprocess
import sys

try:
  import numpy as np
except ImportError:
  logging.critical('Cannot import the third-party Python package numpy')
  sys.exit(1)

from . import data_access
from . import exceptions
from . import signal_processing


class EvaluationScore(object):

  NAME = None
  REGISTERED_CLASSES = {}

  def __init__(self, score_filename_prefix):
    self._score_filename_prefix = score_filename_prefix
    self._input_signal_metadata = None
    self._reference_signal = None
    self._reference_signal_filepath = None
    self._tested_signal = None
    self._tested_signal_filepath = None
    self._output_filepath = None
    self._score = None
    self._render_signal_filepath = None

  @classmethod
  def RegisterClass(cls, class_to_register):
    """Registers an EvaluationScore implementation.

    Decorator to automatically register the classes that extend EvaluationScore.
    Example usage:

    @EvaluationScore.RegisterClass
    class AudioLevelScore(EvaluationScore):
      pass
    """
    cls.REGISTERED_CLASSES[class_to_register.NAME] = class_to_register
    return class_to_register

  @property
  def output_filepath(self):
    return self._output_filepath

  @property
  def score(self):
    return self._score

  def SetInputSignalMetadata(self, metadata):
    """Sets input signal metadata.

    Args:
      metadata: dict instance.
    """
    self._input_signal_metadata = metadata

  def SetReferenceSignalFilepath(self, filepath):
    """Sets the path to the audio track used as reference signal.

    Args:
      filepath: path to the reference audio track.
    """
    self._reference_signal_filepath = filepath

  def SetTestedSignalFilepath(self, filepath):
    """Sets the path to the audio track used as test signal.

    Args:
      filepath: path to the test audio track.
    """
    self._tested_signal_filepath = filepath

  def SetRenderSignalFilepath(self, filepath):
    """Sets the path to the audio track used as render signal.

    Args:
      filepath: path to the test audio track.
    """
    self._render_signal_filepath = filepath

  def Run(self, output_path):
    """Extracts the score for the set test data pair.

    Args:
      output_path: path to the directory where the output is written.
    """
    self._output_filepath = os.path.join(
        output_path, self._score_filename_prefix + self.NAME + '.txt')
    try:
      # If the score has already been computed, load.
      self._LoadScore()
      logging.debug('score found and loaded')
    except IOError:
      # Compute the score.
      logging.debug('score not found, compute')
      self._Run(output_path)

  def _Run(self, output_path):
    # Abstract method.
    raise NotImplementedError()

  def _LoadReferenceSignal(self):
    assert self._reference_signal_filepath is not None
    self._reference_signal = signal_processing.SignalProcessingUtils.LoadWav(
        self._reference_signal_filepath)

  def _LoadTestedSignal(self):
    assert self._tested_signal_filepath is not None
    self._tested_signal = signal_processing.SignalProcessingUtils.LoadWav(
        self._tested_signal_filepath)


  def _LoadScore(self):
    return data_access.ScoreFile.Load(self._output_filepath)

  def _SaveScore(self):
    return data_access.ScoreFile.Save(self._output_filepath, self._score)


@EvaluationScore.RegisterClass
class AudioLevelPeakScore(EvaluationScore):
  """Peak audio level score.

  Defined as the difference between the peak audio level of the tested and
  the reference signals.

  Unit: dB
  Ideal: 0 dB
  Worst case: +/-inf dB
  """

  NAME = 'audio_level_peak'

  def __init__(self, score_filename_prefix):
    EvaluationScore.__init__(self, score_filename_prefix)

  def _Run(self, output_path):
    self._LoadReferenceSignal()
    self._LoadTestedSignal()
    self._score = self._tested_signal.dBFS - self._reference_signal.dBFS
    self._SaveScore()


@EvaluationScore.RegisterClass
class MeanAudioLevelScore(EvaluationScore):
  """Mean audio level score.

  Defined as the difference between the mean audio level of the tested and
  the reference signals.

  Unit: dB
  Ideal: 0 dB
  Worst case: +/-inf dB
  """

  NAME = 'audio_level_mean'

  def __init__(self, score_filename_prefix):
    EvaluationScore.__init__(self, score_filename_prefix)

  def _Run(self, output_path):
    self._LoadReferenceSignal()
    self._LoadTestedSignal()

    dbfs_diffs_sum = 0.0
    seconds = min(len(self._tested_signal), len(self._reference_signal)) // 1000
    for t in range(seconds):
      t0 = t * seconds
      t1 = t0 + seconds
      dbfs_diffs_sum += (
        self._tested_signal[t0:t1].dBFS - self._reference_signal[t0:t1].dBFS)
    self._score = dbfs_diffs_sum / float(seconds)
    self._SaveScore()


@EvaluationScore.RegisterClass
class EchoMetric(EvaluationScore):
  """Echo score.

  Proportion of detected echo.

  Unit: ratio
  Ideal: 0
  Worst case: 1
  """

  NAME = 'echo_metric'

  def __init__(self, score_filename_prefix, echo_detector_bin_filepath):
    EvaluationScore.__init__(self, score_filename_prefix)

    # POLQA binary file path.
    self._echo_detector_bin_filepath = echo_detector_bin_filepath
    if not os.path.exists(self._echo_detector_bin_filepath):
      logging.error('cannot find EchoMetric tool binary file')
      raise exceptions.FileNotFoundError()

    self._echo_detector_bin_path, _ = os.path.split(
        self._echo_detector_bin_filepath)

  def _Run(self, output_path):
    echo_detector_out_filepath = os.path.join(output_path, 'echo_detector.out')
    if os.path.exists(echo_detector_out_filepath):
      os.unlink(echo_detector_out_filepath)

    logging.debug("Render signal filepath: %s", self._render_signal_filepath)
    if not os.path.exists(self._render_signal_filepath):
      logging.error("Render input required for evaluating the echo metric.")

    args = [
        self._echo_detector_bin_filepath,
        '--output_file', echo_detector_out_filepath,
        '--',
        '-i', self._tested_signal_filepath,
        '-ri', self._render_signal_filepath
    ]
    logging.debug(' '.join(args))
    subprocess.call(args, cwd=self._echo_detector_bin_path)

    # Parse Echo detector tool output and extract the score.
    self._score = self._ParseOutputFile(echo_detector_out_filepath)
    self._SaveScore()

  @classmethod
  def _ParseOutputFile(cls, echo_metric_file_path):
    """
    Parses the POLQA tool output formatted as a table ('-t' option).

    Args:
      polqa_out_filepath: path to the POLQA tool output file.

    Returns:
      The score as a number in [0, 1].
    """
    with open(echo_metric_file_path) as f:
      return float(f.read())

@EvaluationScore.RegisterClass
class PolqaScore(EvaluationScore):
  """POLQA score.

  See http://www.polqa.info/.

  Unit: MOS
  Ideal: 4.5
  Worst case: 1.0
  """

  NAME = 'polqa'

  def __init__(self, score_filename_prefix, polqa_bin_filepath):
    EvaluationScore.__init__(self, score_filename_prefix)

    # POLQA binary file path.
    self._polqa_bin_filepath = polqa_bin_filepath
    if not os.path.exists(self._polqa_bin_filepath):
      logging.error('cannot find POLQA tool binary file')
      raise exceptions.FileNotFoundError()

    # Path to the POLQA directory with binary and license files.
    self._polqa_tool_path, _ = os.path.split(self._polqa_bin_filepath)

  def _Run(self, output_path):
    polqa_out_filepath = os.path.join(output_path, 'polqa.out')
    if os.path.exists(polqa_out_filepath):
      os.unlink(polqa_out_filepath)

    args = [
        self._polqa_bin_filepath, '-t', '-q', '-Overwrite',
        '-Ref', self._reference_signal_filepath,
        '-Test', self._tested_signal_filepath,
        '-LC', 'NB',
        '-Out', polqa_out_filepath,
    ]
    logging.debug(' '.join(args))
    subprocess.call(args, cwd=self._polqa_tool_path)

    # Parse POLQA tool output and extract the score.
    polqa_output = self._ParseOutputFile(polqa_out_filepath)
    self._score = float(polqa_output['PolqaScore'])

    self._SaveScore()

  @classmethod
  def _ParseOutputFile(cls, polqa_out_filepath):
    """
    Parses the POLQA tool output formatted as a table ('-t' option).

    Args:
      polqa_out_filepath: path to the POLQA tool output file.

    Returns:
      A dict.
    """
    data = []
    with open(polqa_out_filepath) as f:
      for line in f:
        line = line.strip()
        if len(line) == 0 or line.startswith('*'):
          # Ignore comments.
          continue
        # Read fields.
        data.append(re.split(r'\t+', line))

    # Two rows expected (header and values).
    assert len(data) == 2, 'Cannot parse POLQA output'
    number_of_fields = len(data[0])
    assert number_of_fields == len(data[1])

    # Build and return a dictionary with field names (header) as keys and the
    # corresponding field values as values.
    return {data[0][index]: data[1][index] for index in range(number_of_fields)}


@EvaluationScore.RegisterClass
class TotalHarmonicDistorsionScore(EvaluationScore):
  """Total harmonic distorsion plus noise score.

  Total harmonic distorsion plus noise score.
  See "https://en.wikipedia.org/wiki/Total_harmonic_distortion#THD.2BN".

  Unit: -.
  Ideal: 0.
  Worst case: +inf
  """

  NAME = 'thd'

  def __init__(self, score_filename_prefix):
    EvaluationScore.__init__(self, score_filename_prefix)
    self._input_frequency = None

  def _Run(self, output_path):
    self._CheckInputSignal()

    self._LoadTestedSignal()
    if self._tested_signal.channels != 1:
      raise exceptions.EvaluationScoreException(
          'unsupported number of channels')
    samples = signal_processing.SignalProcessingUtils.AudioSegmentToRawData(
        self._tested_signal)

    # Init.
    num_samples = len(samples)
    duration = len(self._tested_signal) / 1000.0
    scaling = 2.0 / num_samples
    max_freq = self._tested_signal.frame_rate / 2
    f0_freq = float(self._input_frequency)
    t = np.linspace(0, duration, num_samples)

    # Analyze harmonics.
    b_terms = []
    n = 1
    while f0_freq * n < max_freq:
      x_n = np.sum(samples * np.sin(2.0 * np.pi * n * f0_freq * t)) * scaling
      y_n = np.sum(samples * np.cos(2.0 * np.pi * n * f0_freq * t)) * scaling
      b_terms.append(np.sqrt(x_n**2 + y_n**2))
      n += 1

    output_without_fundamental = samples - b_terms[0] * np.sin(
        2.0 * np.pi * f0_freq * t)
    distortion_and_noise = np.sqrt(np.sum(
        output_without_fundamental**2) * np.pi * scaling)

    # TODO(alessiob): Fix or remove if not needed.
    # thd = np.sqrt(np.sum(b_terms[1:]**2)) / b_terms[0]

    # TODO(alessiob): Check the range of |thd_plus_noise| and update the class
    # docstring above if accordingly.
    thd_plus_noise = distortion_and_noise / b_terms[0]

    self._score = thd_plus_noise
    self._SaveScore()

  def _CheckInputSignal(self):
    # Check input signal and get properties.
    try:
      if self._input_signal_metadata['signal'] != 'pure_tone':
        raise exceptions.EvaluationScoreException(
            'The THD score requires a pure tone as input signal')
      self._input_frequency = self._input_signal_metadata['frequency']
      if self._input_signal_metadata['test_data_gen_name'] != 'identity' or (
          self._input_signal_metadata['test_data_gen_config'] != 'default'):
        raise exceptions.EvaluationScoreException(
            'The THD score cannot be used with any test data generator other '
            'than "identity"')
    except TypeError:
      raise exceptions.EvaluationScoreException(
          'The THD score requires an input signal with associated metadata')
    except KeyError:
      raise exceptions.EvaluationScoreException(
          'Invalid input signal metadata to compute the THD score')
